Skip to content

Improvement locking Iceberg iterator#1436

Merged
zvonand merged 2 commits intoantalya-25.8from
feature/antalya-25.8/do_not_lock_iterator
Mar 9, 2026
Merged

Improvement locking Iceberg iterator#1436
zvonand merged 2 commits intoantalya-25.8from
feature/antalya-25.8/do_not_lock_iterator

Conversation

@ianton-ru
Copy link

Changelog category (leave one):

  • Performance Improvement
    Improvement locking Iceberg iterator

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

What is going on:
In IcebergCluster request each replica creates pull of threads, each thread try to get next task. Each thread calls callback

std::lock_guard lock(callback_mutex);
sendReadTaskRequest();
auto res = receiveClusterFunctionReadTaskResponse(*query_state);

All threads share the same socket, only one send request, all others wait on mutex.

Requests from each replica are sending on initiator. Initiator processes them in multiple threads, but all threads calls StorageObjectStorageStableTaskDistributor::getNextTask. This method calls three other methods with lock mutex inside each

    auto file = getPreQueuedFile(number_of_current_replica);
    if (!file)
        file = getMatchingFileFromIterator(number_of_current_replica);
    if (!file)
        file = getAnyUnprocessedFile(number_of_current_replica);

getPreQueuedFile tries to find path for replica in already extracted paths.
getMatchingFileFromIterator calls IcebergIterator::next to extract next path, calculate replica for path, if it is a replica which send request, send this path as response, if this path is for other replica, puts it in list for getPreQueuedFile
getAnyUnprocessedFile executed only when all paths are extracted and more nothing for current replica.

When getMatchingFileFromIterator calls IcebergIterator::next.

IcebergIterator::next is built over ConcurrentBoundedQueue, next waits until something is pushed into queue or queue is finished. So when queue is empty, all threads are waiting until something is happened.

In separate thread IcebergIterator calls SingleThreadIcebergKeysIterator::next to extract next path from Iceberg metadata. It can take a long time, because of reading metadata from S3, pruning files, etc.

Possible scenario:

  • Replica 1 sends request. Initiator executes StorageObjectStorageStableTaskDistributor::getNextTask, finds path for replica 2, put it into list, finds file for replica 1, return it.
  • Replica 1 sends request again. Initiator waits for a next object for a long time, all other objects pruned inside SingleThreadIcebergKeysIterator::next, and it takes a dozens of seconds.
  • Replica 2 send request. StorageObjectStorageStableTaskDistributor has an object for replica 2, but StorageObjectStorageStableTaskDistributor locked until SingleThreadIcebergKeysIterator not finished work.

As result, replica 2 gets path only after processing all metadata.

Solution:

IcebergIterator is built over ConcurrentBoundedQueue, so next method is thread-safe. It can be called without lock StorageObjectStorageStableTaskDistributor, and replica 2 can get prequeued path without waiting SingleThreadIcebergKeysIterator::next.

CI/CD Options

Exclude tests:

  • Fast test
  • Integration Tests
  • Stateless tests
  • Stateful tests
  • Performance tests
  • All with ASAN
  • All with TSAN
  • All with MSAN
  • All with UBSAN
  • All with Coverage
  • All with Aarch64
  • All Regression
  • Disable CI Cache

Regression jobs to run:

  • Fast suites (mostly <1h)
  • Aggregate Functions (2h)
  • Alter (1.5h)
  • Benchmark (30m)
  • ClickHouse Keeper (1h)
  • Iceberg (2h)
  • LDAP (1h)
  • Parquet (1.5h)
  • RBAC (1.5h)
  • SSL Server (1h)
  • S3 (2h)
  • Tiered Storage (2h)

@ianton-ru
Copy link
Author

@codex review

@github-actions
Copy link

github-actions bot commented Feb 20, 2026

Workflow [PR], commit [51762a7]

@chatgpt-codex-connector
Copy link

Codex Review: Didn't find any major issues. You're on a roll.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@zvonand
Copy link
Collaborator

zvonand commented Feb 27, 2026

Why there is an antalya-26.1 label? this is a PR to 25.8

@ianton-ru
Copy link
Author

Change make sense for 26.1 too, but good to approve that it fixed the issue before porting.

@zvonand
Copy link
Collaborator

zvonand commented Mar 2, 2026

We're trying to abandon 25.8 :) We release it as is (hopefully, today/tomorrow), idk if we will do further releases

So I guess it is not a priority now (to verify something against 25.8, having in mind it will have to be re-verified later for 26.1)

Copy link
Collaborator

@arthurpassos arthurpassos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left a few nit comments.

Did you benchmark it to see if there is any practical difference?

virtual ObjectInfoPtr next(size_t) = 0;
virtual size_t estimatedKeysCount() = 0;
virtual std::optional<UInt64> getSnapshotVersion() const { return std::nullopt; }
virtual bool has_concurrent_next() const { return false; }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add docs, it is not clear to the reader of this API what has_concurrent_next means. Plus, shouldn't it be hasConcurrentNext?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps something more descriptive like hasLockingMechanismInNextOperation

{
ObjectInfoPtr object_info;

if (iterator->has_concurrent_next())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a comment similar to the below:

In case IObjectIterator::hasConcurrentNext is true, a locking mechanism is already in place in IObjectIterator::next. Therefore, there is no need to guard it with StorageObjectStorageStableTaskDistributor::mutex

@zvonand

This comment was marked as outdated.

@zvonand
Copy link
Collaborator

zvonand commented Mar 5, 2026

Likely this was fixed in the branch. Let's try updating upon the latest changes

@zvonand zvonand merged commit 4edabba into antalya-25.8 Mar 9, 2026
1166 of 1177 checks passed
@alsugiliazova
Copy link
Member

Audit Report: PR #1436

Low

  • Iterator concurrency capability is not forwarded by wrapper iterators
    • Impact: PR #1436 lock-elision optimization can be silently disabled in wrapped iterator paths, so next() remains serialized under distributor mutex and latency improvement is lost.
    • Anchor: IObjectIterator wrappers (ObjectIteratorWithPathAndFileFilter, ObjectIteratorSplitByBuckets) + StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator.
    • Trigger: Iceberg iterator is wrapped before entering task distributor (filter or bucket-split paths).
    • Why this is a defect: The distributor checks iterator->has_concurrent_next(), but wrappers keep default false instead of forwarding to wrapped iterator, so behavior differs between wrapped and unwrapped equivalent flows.
    • Fix direction: Override and forward has_concurrent_next() in both wrapper classes.
    • Regression test direction: Assert wrapped IcebergIterator still reports has_concurrent_next()==true and follows unlocked path.
@@ -154,6 +154,19 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter
     {
         ObjectInfoPtr object_info;

+        if (iterator->has_concurrent_next())
+        {
+            object_info = iterator->next(0);
+
+            if (!object_info)
+            {
+                LOG_TEST(log, "Iterator is exhausted");
+                std::lock_guard lock(mutex);
+                iterator_exhausted = true;
+                break;
+            }
+        }
+        else
         {
             std::lock_guard lock(mutex);
             object_info = iterator->next(0);
struct IObjectIterator
{
    virtual ~IObjectIterator() = default;
    virtual ObjectInfoPtr next(size_t) = 0;
    virtual size_t estimatedKeysCount() = 0;
    virtual std::optional<UInt64> getSnapshotVersion() const { return std::nullopt; }
};

@alsugiliazova
Copy link
Member

alsugiliazova commented Mar 9, 2026

Audit Report: PR #1436 Metric Attribution

Scope: IcebergIteratorNextMicroseconds observability behavior between system.events and system.query_log.

Confirmed defect

Medium

  • IcebergIteratorNextMicroseconds is not attributed to query-level ProfileEvents
    • Impact: query-level monitoring and troubleshooting are misleading. system.events shows non-zero iterator time while system.query_log.ProfileEvents['IcebergIteratorNextMicroseconds'] remains zero, hiding per-query cost.
    • Anchor: src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp, src/Interpreters/ProcessList.cpp.
    • Trigger: Iceberg query that uses IcebergIterator producer thread.
    • Why defect: the metric is incremented in work executed by a ThreadFromGlobalPool thread that is not attached to the query thread group. system.query_log snapshots thread-group counters only.
    • Smallest logical reproduction:
      1. Run Iceberg query that scans metadata/manifest files.
      2. Check system.events for IcebergIteratorNextMicroseconds -> increases.
      3. Check system.query_log sum for ProfileEvents['IcebergIteratorNextMicroseconds'] -> remains zero.
    • Likely fix direction:
      • Capture thread_group = CurrentThread::getGroup() before scheduling producer.
      • Attach producer thread with ThreadGroupSwitcher inside lambda.
      • Keep existing exception and queue semantics unchanged.
    • Regression test direction:
      • Integration test: run one Iceberg query, then verify corresponding system.query_log record has non-zero ProfileEvents['IcebergIteratorNextMicroseconds'] when work is non-trivial.
      • Keep global system.events delta check for sanity.
    • Blast radius: Iceberg query observability; affects profiling dashboards and query-level performance triage.

Evidence

Producer thread created without thread-group attachment:

producer_task.emplace(
    [this]()
    {
        while (!blocking_queue.isFinished())
        {
            std::optional<ManifestFileEntryPtr> entry;
            try
            {
                entry = data_files_iterator.next();

Query log takes counters from query thread group:

if (thread_group)
{
    res.memory_usage = thread_group->memory_tracker.get();
    res.peak_memory_usage = thread_group->memory_tracker.getPeak();

    if (get_profile_events)
        res.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_group->performance_counters.getPartiallyAtomicSnapshot());
}

Known good pattern (attach background worker to query group):

thread = ThreadFromGlobalPool(
    [&, thread_group = DB::CurrentThread::getGroup()]
    {
        /// Attach to current query thread group, to be able to
        /// have query id in logs and metrics from scanDataFunc.
        DB::ThreadGroupSwitcher switcher(thread_group, DB::ThreadName::DATALAKE_TABLE_SNAPSHOT);
        scanDataFunc();
    });

Confidence

  • Overall confidence: High
    • The discrepancy follows directly from counter ownership: global counters vs query thread-group counters.
    • Code path shows missing thread-group attachment in Iceberg producer thread.

@alsugiliazova alsugiliazova added the verified-with-issue Verified by QA and issue(s) found. label Mar 9, 2026
@alsugiliazova
Copy link
Member

PR #1436 CI Verification Report

MasterCI Results

84 success, 2 failures, 2 pending/running, 3 skipped (91 total)

# Suite Failing Test Error Related to PR Flakiness (90d)
1 Stateless (amd_tsan, s3, parallel) 01042_check_query_and_last_granule_size Result differs with reference No 6 fails / 6 days
2 Integration (amd_tsan, 5/6) test_dns_cache/test.py::test_user_access_ip_change[node5] DNS cache test failure No 2 fails / 2 days

Both MasterCI failures are known flaky tests unrelated to Iceberg iterator locking.

Regression Tests (clickhouse-regression)

126 success, 3 failure, 1 pending (130 total)

Suite Platform Status Pre-existing on Parent
swarms Release Fail Yes (also failing on 7159a04)
swarms Aarch64 Fail No (was success on parent)
tiered_storage_local Aarch64 Fail No (was success on parent)
  • swarms (Release): Pre-existing failure, already failing on parent commit 7159a04.
  • swarms (Aarch64) and tiered_storage_local (Aarch64): Passed on parent but failed on merge commit. However, both are completely unrelated to the PR changes (Iceberg iterator locking). These are generic "Process completed with exit code 1" infrastructure-level failures with no specific test failure details. The swarms and tiered_storage suites do not test Iceberg iterator behavior.

Iceberg-Specific Regression Tests

All 4 Iceberg regression suites PASSED:

Suite Platform Status
Iceberg (1) Release Success
Iceberg (2) Release Success
Iceberg (1) Aarch64 Success
Iceberg (2) Aarch64 Success

Other CI Failures

Check Status Related
GrypeScanServer (-alpine) / Grype Scan Fail No - security vulnerability scan, unrelated to code changes

@ianton-ru ianton-ru mentioned this pull request Mar 10, 2026
25 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

antalya antalya-25.8 verified-with-issue Verified by QA and issue(s) found.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants